package com.xxxx;

import com.xxxx.pojo.Person;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;


public class Hello02FlinkSetTable {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();

        DataSource<Person> dataSource = environment.readCsvFile("data/person.txt").ignoreFirstLine().pojoType(Person.class, "id", "name", "age", "score", "cls");

        BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(environment);
        batchTableEnvironment.registerDataSet("person", dataSource);

        Table person = batchTableEnvironment.scan("person");
        person.printSchema();
        Table result = person.groupBy("cls").select("cls,score.max");


        DataSet<Row> rowDataSet = batchTableEnvironment.toDataSet(result, Row.class);

        rowDataSet.print();

    }
}
